package cn.doitedu.sql;

import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Demo21_JdbcCatalog {
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");

        env.setParallelism(1);

        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        JdbcCatalog jdbcCatalog = new JdbcCatalog(
                Demo21_JdbcCatalog.class.getClassLoader(),
                "jdbc",
                "doit47",
                "root",
                "root",
                "jdbc:mysql://doitedu:3306/"
        );

        // 注册到tenv
        tenv.registerCatalog("xx_catalog",jdbcCatalog);


        // 可以直接查询mysql重点表
        tenv.executeSql("use catalog xx_catalog");
        tenv.executeSql("use doit47");

        tenv.executeSql("select * from stu_info").print();






    }
}
